#include "mavsdk_impl.h"

#include <algorithm>
#include <mutex>
#include <tcp_server_connection.h>

#include "connection.h"
#include "log.h"
#include "tcp_client_connection.h"
#include "tcp_server_connection.h"
#include "udp_connection.h"
#include "system.h"
#include "system_impl.h"
#include "serial_connection.h"
#include "version.h"
#include "server_component_impl.h"
#include "overloaded.h"
#include "mavlink_channels.h"
#include "callback_list.tpp"
#include "hostname_to_ip.h"
#include "embedded_mavlink_xml.h"
#include <mav/MessageSet.h>

namespace mavsdk {

template class CallbackList<>;

MavsdkImpl::MavsdkImpl(const Mavsdk::Configuration& configuration) :
    timeout_handler(time),
    call_every_handler(time)
{
    LogInfo() << "MAVSDK version: " << mavsdk_version;

    if (const char* env_p = std::getenv("MAVSDK_CALLBACK_DEBUGGING")) {
        if (std::string(env_p) == "1") {
            LogDebug() << "Callback debugging is on.";
            _callback_debugging = true;
        }
    }

    if (const char* env_p = std::getenv("MAVSDK_MESSAGE_DEBUGGING")) {
        if (std::string(env_p) == "1") {
            LogDebug() << "Message debugging is on.";
            _message_logging_on = true;
        }
    }

    if (const char* env_p = std::getenv("MAVSDK_SYSTEM_DEBUGGING")) {
        if (std::string(env_p) == "1") {
            LogDebug() << "System debugging is on.";
            _system_debugging = true;
        }
    }

    set_configuration(configuration);

    // Initialize MessageSet with embedded XML content in dependency order
    // This happens at startup before any connections are created, so no synchronization needed
    _message_set = std::make_unique<mav::MessageSet>();
    _message_set->addFromXMLString(mav_embedded::MINIMAL_XML);
    _message_set->addFromXMLString(mav_embedded::STANDARD_XML);
    _message_set->addFromXMLString(mav_embedded::COMMON_XML);
    _message_set->addFromXMLString(mav_embedded::ARDUPILOTMEGA_XML);

    // Initialize BufferParser for thread-safe parsing
    _buffer_parser = std::make_unique<mav::BufferParser>(*_message_set);

    // Start the user callback thread first, so it is ready for anything generated by
    // the work thread.

    _process_user_callbacks_thread =
        new std::thread(&MavsdkImpl::process_user_callbacks_thread, this);

    _work_thread = new std::thread(&MavsdkImpl::work_thread, this);
}

MavsdkImpl::~MavsdkImpl()
{
    {
        std::lock_guard<std::mutex> lock(_heartbeat_mutex);
        call_every_handler.remove(_heartbeat_send_cookie);
    }

    _should_exit = true;

    // Stop work first because we don't want to trigger anything that would
    // potentially want to call into user code.

    if (_work_thread != nullptr) {
        _work_thread->join();
        delete _work_thread;
        _work_thread = nullptr;
    }

    if (_process_user_callbacks_thread != nullptr) {
        _user_callback_queue.stop();
        _process_user_callbacks_thread->join();
        delete _process_user_callbacks_thread;
        _process_user_callbacks_thread = nullptr;
    }

    std::lock_guard lock(_mutex);

    _systems.clear();
    _connections.clear();
}

std::string MavsdkImpl::version()
{
    static unsigned version_counter = 0;

    ++version_counter;

    switch (version_counter) {
        case 10:
            return "You were wondering about the name of this library?";
        case 11:
            return "Let's look at the history:";
        case 12:
            return "DroneLink";
        case 13:
            return "DroneCore";
        case 14:
            return "DronecodeSDK";
        case 15:
            return "MAVSDK";
        case 16:
            return "And that's it...";
        case 17:
            return "At least for now ¯\\_(ツ)_/¯.";
        default:
            return mavsdk_version;
    }
}

std::vector<std::shared_ptr<System>> MavsdkImpl::systems() const
{
    std::vector<std::shared_ptr<System>> systems_result{};

    std::lock_guard lock(_mutex);
    for (auto& system : _systems) {
        // We ignore the 0 entry because it's just a null system.
        // It's only created because the older, deprecated API needs a
        // reference.
        if (system.first == 0) {
            continue;
        }
        systems_result.push_back(system.second);
    }

    return systems_result;
}

std::optional<std::shared_ptr<System>> MavsdkImpl::first_autopilot(double timeout_s)
{
    {
        std::lock_guard lock(_mutex);
        for (auto system : _systems) {
            if (system.second->is_connected() && system.second->has_autopilot()) {
                return system.second;
            }
        }
    }

    if (timeout_s == 0.0) {
        // Don't wait at all.
        return {};
    }

    auto prom = std::make_shared<std::promise<std::shared_ptr<System>>>();
    auto fut = prom->get_future();

    auto flag = std::make_shared<std::once_flag>();
    auto handle = subscribe_on_new_system([this, prom, flag]() {
        // Check all systems, not just the first one
        auto all_systems = systems();
        for (auto& system : all_systems) {
            if (system->is_connected() && system->has_autopilot()) {
                std::call_once(*flag, [prom, system]() { prom->set_value(system); });
                break;
            }
        }
    });

    if (timeout_s > 0.0) {
        if (fut.wait_for(std::chrono::milliseconds(int64_t(timeout_s * 1e3))) ==
            std::future_status::ready) {
            unsubscribe_on_new_system(handle);
            return fut.get();

        } else {
            unsubscribe_on_new_system(handle);
            return std::nullopt;
        }
    } else {
        fut.wait();
        unsubscribe_on_new_system(handle);
        return std::optional(fut.get());
    }
}

std::shared_ptr<ServerComponent> MavsdkImpl::server_component(unsigned instance)
{
    std::lock_guard lock(_mutex);

    auto component_type = _configuration.get_component_type();
    switch (component_type) {
        case ComponentType::Autopilot:
        case ComponentType::GroundStation:
        case ComponentType::CompanionComputer:
        case ComponentType::Camera:
        case ComponentType::Gimbal:
        case ComponentType::RemoteId:
        case ComponentType::Custom:
            return server_component_by_type(component_type, instance);
        default:
            LogErr() << "Unknown component type";
            return {};
    }
}

std::shared_ptr<ServerComponent>
MavsdkImpl::server_component_by_type(ComponentType server_component_type, unsigned instance)
{
    switch (server_component_type) {
        case ComponentType::Autopilot:
            if (instance == 0) {
                return server_component_by_id(MAV_COMP_ID_AUTOPILOT1);
            } else {
                LogErr() << "Only autopilot instance 0 is valid";
                return {};
            }

        case ComponentType::GroundStation:
            if (instance == 0) {
                return server_component_by_id(MAV_COMP_ID_MISSIONPLANNER);
            } else {
                LogErr() << "Only one ground station supported at this time";
                return {};
            }

        case ComponentType::CompanionComputer:
            if (instance == 0) {
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER);
            } else if (instance == 1) {
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER2);
            } else if (instance == 2) {
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER3);
            } else if (instance == 3) {
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER4);
            } else {
                LogErr() << "Only companion computer 0..3 are supported";
                return {};
            }

        case ComponentType::Camera:
            if (instance == 0) {
                return server_component_by_id(MAV_COMP_ID_CAMERA);
            } else if (instance == 1) {
                return server_component_by_id(MAV_COMP_ID_CAMERA2);
            } else if (instance == 2) {
                return server_component_by_id(MAV_COMP_ID_CAMERA3);
            } else if (instance == 3) {
                return server_component_by_id(MAV_COMP_ID_CAMERA4);
            } else if (instance == 4) {
                return server_component_by_id(MAV_COMP_ID_CAMERA5);
            } else if (instance == 5) {
                return server_component_by_id(MAV_COMP_ID_CAMERA6);
            } else {
                LogErr() << "Only camera 0..5 are supported";
                return {};
            }

        default:
            LogErr() << "Unknown server component type";
            return {};
    }
}

std::shared_ptr<ServerComponent> MavsdkImpl::server_component_by_id(uint8_t component_id)
{
    if (component_id == 0) {
        LogErr() << "Server component with component ID 0 not allowed";
        return nullptr;
    }

    std::lock_guard lock(_server_components_mutex);

    return server_component_by_id_with_lock(component_id);
}

std::shared_ptr<ServerComponent> MavsdkImpl::server_component_by_id_with_lock(uint8_t component_id)
{
    for (auto& it : _server_components) {
        if (it.first == component_id) {
            if (it.second != nullptr) {
                return it.second;
            } else {
                it.second = std::make_shared<ServerComponent>(*this, component_id);
            }
        }
    }

    _server_components.emplace_back(std::pair<uint8_t, std::shared_ptr<ServerComponent>>(
        component_id, std::make_shared<ServerComponent>(*this, component_id)));

    return _server_components.back().second;
}

void MavsdkImpl::forward_message(mavlink_message_t& message, Connection* connection)
{
    // Forward_message Function implementing Mavlink routing rules.
    // See https://mavlink.io/en/guide/routing.html

    bool forward_heartbeats_enabled = true;
    const uint8_t target_system_id = get_target_system_id(message);
    const uint8_t target_component_id = get_target_component_id(message);

    // If it's a message only for us, we keep it, otherwise, we forward it.
    const bool targeted_only_at_us =
        (target_system_id == get_own_system_id() && target_component_id == get_own_component_id());

    // We don't forward heartbeats unless it's specifically enabled.
    const bool heartbeat_check_ok =
        (message.msgid != MAVLINK_MSG_ID_HEARTBEAT || forward_heartbeats_enabled);

    if (!targeted_only_at_us && heartbeat_check_ok) {
        unsigned successful_emissions = 0;
        for (auto& entry : _connections) {
            // Check whether the connection is not the one from which we received the message.
            // And also check if the connection was set to forward messages.
            if (entry.connection.get() == connection ||
                !entry.connection->should_forward_messages()) {
                continue;
            }
            auto result = (*entry.connection).send_message(message);
            if (result.first) {
                successful_emissions++;
            } else {
                _connections_errors_subscriptions.queue(
                    Mavsdk::ConnectionError{result.second, entry.handle},
                    [this](const auto& func) { call_user_callback(func); });
            }
        }
        if (successful_emissions == 0) {
            LogErr() << "Message forwarding failed";
        }
    }
}

void MavsdkImpl::receive_message(mavlink_message_t& message, Connection* connection)
{
    {
        std::lock_guard lock(_received_messages_mutex);
        _received_messages.emplace(ReceivedMessage{std::move(message), connection});
    }
    _received_messages_cv.notify_one();
}

void MavsdkImpl::receive_libmav_message(
    const Mavsdk::MavlinkMessage& message, Connection* connection)
{
    {
        std::lock_guard lock(_received_libmav_messages_mutex);
        _received_libmav_messages.emplace(ReceivedLibmavMessage{message, connection});
    }
    _received_libmav_messages_cv.notify_one();
}

void MavsdkImpl::process_messages()
{
    std::lock_guard lock(_received_messages_mutex);
    while (!_received_messages.empty()) {
        auto message_copied = _received_messages.front();
        process_message(message_copied.message, message_copied.connection_ptr);
        _received_messages.pop();
    }
}

void MavsdkImpl::process_libmav_messages()
{
    std::lock_guard lock(_received_libmav_messages_mutex);
    while (!_received_libmav_messages.empty()) {
        auto message_copied = _received_libmav_messages.front();
        process_libmav_message(message_copied.message, message_copied.connection_ptr);
        _received_libmav_messages.pop();
    }
}

void MavsdkImpl::process_message(mavlink_message_t& message, Connection* connection)
{
    // Assumes _received_messages_mutex

    if (_message_logging_on) {
        LogDebug() << "Processing message " << message.msgid << " from "
                   << static_cast<int>(message.sysid) << "/" << static_cast<int>(message.compid);
    }

    if (_should_exit) {
        // If we're meant to clean up, let's not try to acquire any more locks but bail.
        return;
    }

    {
        std::lock_guard lock(_mutex);

        /** @note: Forward message FIRST (before intercept) if option is enabled and multiple
         * interfaces are connected. This ensures that forwarded messages are not affected by
         * intercept modifications. Performs message forwarding checks for every messages if message
         * forwarding is enabled on at least one connection, and in case of a single forwarding
         * connection, we check that it is not the one which received the current message.
         *
         * Conditions:
         * 1. At least 2 connections.
         * 2. At least 1 forwarding connection.
         * 3. At least 2 forwarding connections or current connection is not forwarding.
         */

        if (_connections.size() > 1 && mavsdk::Connection::forwarding_connections_count() > 0 &&
            (mavsdk::Connection::forwarding_connections_count() > 1 ||
             !connection->should_forward_messages())) {
            if (_message_logging_on) {
                LogDebug() << "Forwarding message " << message.msgid << " from "
                           << static_cast<int>(message.sysid) << "/"
                           << static_cast<int>(message.compid);
            }
            forward_message(message, connection);
        }

        if (_should_exit) {
            // If we're meant to clean up, let's not try to acquire any more locks but bail.
            return;
        }

        // This is a low level interface where incoming messages can be tampered
        // with or even dropped FOR LOCAL PROCESSING ONLY (after forwarding).
        {
            bool keep = true;
            {
                std::lock_guard<std::mutex> intercept_lock(_intercept_callbacks_mutex);
                if (_intercept_incoming_messages_callback != nullptr) {
                    keep = _intercept_incoming_messages_callback(message);
                }
            }

            if (!keep) {
                LogDebug() << "Dropped incoming message: " << int(message.msgid);
                return;
            }
        }

        // Don't ever create a system with sysid 0.
        if (message.sysid == 0) {
            if (_message_logging_on) {
                LogDebug() << "Ignoring message with sysid == 0";
            }
            return;
        }

        // Filter out messages by QGroundControl, however, only do that if MAVSDK
        // is also implementing a ground station and not if it is used in another
        // configuration, e.g. on a companion.
        //
        // This is a workaround because PX4 started forwarding messages between
        // mavlink instances which leads to existing implementations (including
        // examples and integration tests) to connect to QGroundControl by accident
        // instead of PX4 because the check `has_autopilot()` is not used.

        if (_configuration.get_component_type() == ComponentType::GroundStation &&
            message.sysid == 255 && message.compid == MAV_COMP_ID_MISSIONPLANNER) {
            if (_message_logging_on) {
                LogDebug() << "Ignoring messages from QGC as we are also a ground station";
            }
            return;
        }

        bool found_system = false;
        for (auto& system : _systems) {
            if (system.first == message.sysid) {
                system.second->system_impl()->add_new_component(message.compid);
                found_system = true;
                break;
            }
        }

        if (!found_system) {
            if (_system_debugging) {
                LogWarn() << "Create new system/component " << (int)message.sysid << "/"
                          << (int)message.compid;
                LogWarn() << "From message " << (int)message.msgid << " with len "
                          << (int)message.len;
                std::string bytes = "";
                for (unsigned i = 0; i < 12 + message.len; ++i) {
                    bytes += std::to_string(reinterpret_cast<uint8_t*>(&message)[i]) + ' ';
                }
                LogWarn() << "Bytes: " << bytes;
            }
            make_system_with_component(message.sysid, message.compid);

            // We now better talk back.
            start_sending_heartbeats();
        }

        if (_should_exit) {
            // Don't try to call at() if systems have already been destroyed
            // in destructor.
            return;
        }
    }

    mavlink_message_handler.process_message(message);

    for (auto& system : _systems) {
        if (system.first == message.sysid) {
            system.second->system_impl()->process_mavlink_message(message);
            break;
        }
    }
}

void MavsdkImpl::process_libmav_message(
    const Mavsdk::MavlinkMessage& message, Connection* /* connection */)
{
    // Assumes _received_libmav_messages_mutex

    if (_message_logging_on) {
        LogDebug() << "MavsdkImpl::process_libmav_message: " << message.message_name << " from "
                   << static_cast<int>(message.system_id) << "/"
                   << static_cast<int>(message.component_id);
    }

    // JSON message interception for incoming messages
    if (!call_json_interception_callbacks(message, _incoming_json_message_subscriptions)) {
        // Message was dropped by interception callback
        if (_message_logging_on) {
            LogDebug() << "Incoming JSON message " << message.message_name
                       << " dropped by interception";
        }
        return;
    }

    if (_message_logging_on) {
        LogDebug() << "Processing libmav message " << message.message_name << " from "
                   << static_cast<int>(message.system_id) << "/"
                   << static_cast<int>(message.component_id);
    }

    if (_should_exit) {
        // If we're meant to clean up, let's not try to acquire any more locks but bail.
        return;
    }

    {
        std::lock_guard lock(_mutex);

        // Don't ever create a system with sysid 0.
        if (message.system_id == 0) {
            if (_message_logging_on) {
                LogDebug() << "Ignoring libmav message with sysid == 0";
            }
            return;
        }

        // Filter out QGroundControl messages similar to regular mavlink processing
        if (_configuration.get_component_type() == ComponentType::GroundStation &&
            message.system_id == 255 && message.component_id == MAV_COMP_ID_MISSIONPLANNER) {
            if (_message_logging_on) {
                LogDebug() << "Ignoring libmav messages from QGC as we are also a ground station";
            }
            return;
        }

        bool found_system = false;
        for (auto& system : _systems) {
            if (system.first == message.system_id) {
                system.second->system_impl()->add_new_component(message.component_id);
                found_system = true;
                break;
            }
        }

        if (!found_system) {
            if (_system_debugging) {
                LogWarn() << "Create new system/component from libmav " << (int)message.system_id
                          << "/" << (int)message.component_id;
            }
            make_system_with_component(message.system_id, message.component_id);

            // We now better talk back.
            start_sending_heartbeats();
        }

        if (_should_exit) {
            // Don't try to call at() if systems have already been destroyed
            // in destructor.
            return;
        }
    }

    // Distribute libmav message to systems for libmav-specific handling
    bool found_system = false;
    for (auto& system : _systems) {
        if (system.first == message.system_id) {
            if (_message_logging_on) {
                LogDebug() << "Distributing libmav message " << message.message_name
                           << " to SystemImpl for system " << system.first;
            }
            system.second->system_impl()->process_libmav_message(message);
            found_system = true;
            // Don't break - distribute to all matching system instances
        }
    }

    if (!found_system) {
        LogWarn() << "No system found for libmav message " << message.message_name
                  << " from system " << message.system_id;
    }
}

bool MavsdkImpl::send_message(mavlink_message_t& message)
{
    // Create a copy of the message to avoid reference issues
    mavlink_message_t message_copy = message;

    {
        std::lock_guard lock(_messages_to_send_mutex);
        _messages_to_send.push(std::move(message_copy));
    }

    // For heartbeat messages, we want to process them immediately to speed up system discovery
    if (message.msgid == MAVLINK_MSG_ID_HEARTBEAT) {
        // Trigger message processing in the work thread
        // This is a hint to process messages sooner, but doesn't block
        std::this_thread::yield();
    }

    return true;
}

void MavsdkImpl::deliver_messages()
{
    // Process messages one at a time to avoid holding the mutex while delivering
    while (true) {
        mavlink_message_t message;
        {
            std::lock_guard lock(_messages_to_send_mutex);
            if (_messages_to_send.empty()) {
                break;
            }
            message = _messages_to_send.front();
            _messages_to_send.pop();
        }
        deliver_message(message);
    }
}

void MavsdkImpl::deliver_message(mavlink_message_t& message)
{
    if (_message_logging_on) {
        LogDebug() << "Sending message " << message.msgid << " from "
                   << static_cast<int>(message.sysid) << "/" << static_cast<int>(message.compid)
                   << " to " << static_cast<int>(get_target_system_id(message)) << "/"
                   << static_cast<int>(get_target_component_id(message));
    }

    // This is a low level interface where outgoing messages can be tampered
    // with or even dropped.
    bool keep = true;
    {
        std::lock_guard<std::mutex> lock(_intercept_callbacks_mutex);
        if (_intercept_outgoing_messages_callback != nullptr) {
            keep = _intercept_outgoing_messages_callback(message);
        }
    }

    if (!keep) {
        // We fake that everything was sent as instructed because
        // a potential loss would happen later, and we would not be informed
        // about it.
        LogDebug() << "Dropped outgoing message: " << int(message.msgid);
        return;
    }

    // JSON message interception for outgoing messages
    // Convert mavlink_message_t to Mavsdk::MavlinkMessage for JSON interception
    uint8_t buffer[MAVLINK_MAX_PACKET_LEN];
    uint16_t len = mavlink_msg_to_send_buffer(buffer, &message);

    size_t bytes_consumed = 0;
    auto libmav_msg_opt = parse_message_safe(buffer, len, bytes_consumed);

    if (libmav_msg_opt) {
        // Create Mavsdk::MavlinkMessage directly for JSON interception
        Mavsdk::MavlinkMessage json_message;
        json_message.message_name = libmav_msg_opt.value().name();
        json_message.system_id = message.sysid;
        json_message.component_id = message.compid;

        // Extract target_system and target_component if present
        uint8_t target_system_id = 0;
        uint8_t target_component_id = 0;
        if (libmav_msg_opt.value().get("target_system", target_system_id) ==
            mav::MessageResult::Success) {
            json_message.target_system_id = target_system_id;
        } else {
            json_message.target_system_id = 0;
        }
        if (libmav_msg_opt.value().get("target_component", target_component_id) ==
            mav::MessageResult::Success) {
            json_message.target_component_id = target_component_id;
        } else {
            json_message.target_component_id = 0;
        }

        // Generate JSON using LibmavReceiver's public method
        auto connections = get_connections();
        if (!connections.empty() && connections[0]->get_libmav_receiver()) {
            json_message.fields_json =
                connections[0]->get_libmav_receiver()->libmav_message_to_json(
                    libmav_msg_opt.value());
        } else {
            // Fallback: create minimal JSON if no receiver available
            json_message.fields_json =
                "{\"message_id\":" + std::to_string(libmav_msg_opt.value().id()) +
                ",\"message_name\":\"" + libmav_msg_opt.value().name() + "\"}";
        }

        if (!call_json_interception_callbacks(json_message, _outgoing_json_message_subscriptions)) {
            // Message was dropped by JSON interception callback
            if (_message_logging_on) {
                LogDebug() << "Outgoing JSON message " << json_message.message_name
                           << " dropped by interception";
            }
            return;
        }
    }

    std::lock_guard lock(_mutex);

    if (_connections.empty()) {
        // We obviously can't send any messages without a connection added, so
        // we silently ignore this.
        return;
    }

    uint8_t successful_emissions = 0;
    for (auto& _connection : _connections) {
        const uint8_t target_system_id = get_target_system_id(message);

        if (target_system_id != 0 && !(*_connection.connection).has_system_id(target_system_id)) {
            continue;
        }
        const auto result = (*_connection.connection).send_message(message);
        if (result.first) {
            successful_emissions++;
        } else {
            _connections_errors_subscriptions.queue(
                Mavsdk::ConnectionError{result.second, _connection.handle},
                [this](const auto& func) { call_user_callback(func); });
        }
    }

    if (successful_emissions == 0) {
        LogErr() << "Sending message failed";
    }
}

std::pair<ConnectionResult, Mavsdk::ConnectionHandle> MavsdkImpl::add_any_connection(
    const std::string& connection_url, ForwardingOption forwarding_option)
{
    CliArg cli_arg;
    if (!cli_arg.parse(connection_url)) {
        return {ConnectionResult::ConnectionUrlInvalid, Mavsdk::ConnectionHandle{}};
    }

    return std::visit(
        overloaded{
            [](std::monostate) {
                // Should not happen anyway.
                return std::pair<ConnectionResult, Mavsdk::ConnectionHandle>{
                    ConnectionResult::ConnectionUrlInvalid, Mavsdk::ConnectionHandle()};
            },
            [this, forwarding_option](const CliArg::Udp& udp) {
                return add_udp_connection(udp, forwarding_option);
            },
            [this, forwarding_option](const CliArg::Tcp& tcp) {
                return add_tcp_connection(tcp, forwarding_option);
            },
            [this, forwarding_option](const CliArg::Serial& serial) {
                return add_serial_connection(
                    serial.path, serial.baudrate, serial.flow_control_enabled, forwarding_option);
            }},
        cli_arg.protocol);
}

std::pair<ConnectionResult, Mavsdk::ConnectionHandle>
MavsdkImpl::add_udp_connection(const CliArg::Udp& udp, ForwardingOption forwarding_option)
{
    auto new_conn = std::make_unique<UdpConnection>(
        [this](mavlink_message_t& message, Connection* connection) {
            receive_message(message, connection);
        },
        [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
            receive_libmav_message(message, connection);
        },
        *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
        udp.mode == CliArg::Udp::Mode::In ? udp.host : "0.0.0.0",
        udp.mode == CliArg::Udp::Mode::In ? udp.port : 0,
        forwarding_option);

    if (!new_conn) {
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
    }

    ConnectionResult ret = new_conn->start();

    if (ret != ConnectionResult::Success) {
        return {ret, Mavsdk::ConnectionHandle{}};
    }

    if (udp.mode == CliArg::Udp::Mode::Out) {
        // We need to add the IP rather than a hostname, otherwise we end up with two remotes:
        // one for the IP, and one for a hostname.
        auto remote_ip = resolve_hostname_to_ip(udp.host);

        if (!remote_ip) {
            return {ConnectionResult::DestinationIpUnknown, Mavsdk::ConnectionHandle{}};
        }

        new_conn->add_remote_to_keep(remote_ip.value(), udp.port);
        std::lock_guard lock(_mutex);

        // With a UDP remote, we need to initiate the connection by sending heartbeats.
        auto new_configuration = get_configuration();
        new_configuration.set_always_send_heartbeats(true);
        set_configuration(new_configuration);
    }

    auto handle = add_connection(std::move(new_conn));

    return {ConnectionResult::Success, handle};
}

std::pair<ConnectionResult, Mavsdk::ConnectionHandle>
MavsdkImpl::add_tcp_connection(const CliArg::Tcp& tcp, ForwardingOption forwarding_option)
{
    if (tcp.mode == CliArg::Tcp::Mode::Out) {
        auto new_conn = std::make_unique<TcpClientConnection>(
            [this](mavlink_message_t& message, Connection* connection) {
                receive_message(message, connection);
            },
            [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
                receive_libmav_message(message, connection);
            },
            *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
            tcp.host,
            tcp.port,
            forwarding_option);
        if (!new_conn) {
            return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
        }
        ConnectionResult ret = new_conn->start();
        if (ret == ConnectionResult::Success) {
            return {ret, add_connection(std::move(new_conn))};
        } else {
            return {ret, Mavsdk::ConnectionHandle{}};
        }
    } else {
        auto new_conn = std::make_unique<TcpServerConnection>(
            [this](mavlink_message_t& message, Connection* connection) {
                receive_message(message, connection);
            },
            [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
                receive_libmav_message(message, connection);
            },
            *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
            tcp.host,
            tcp.port,
            forwarding_option);
        if (!new_conn) {
            return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
        }
        ConnectionResult ret = new_conn->start();
        if (ret == ConnectionResult::Success) {
            return {ret, add_connection(std::move(new_conn))};
        } else {
            return {ret, Mavsdk::ConnectionHandle{}};
        }
    }
}

std::pair<ConnectionResult, Mavsdk::ConnectionHandle> MavsdkImpl::add_serial_connection(
    const std::string& dev_path,
    int baudrate,
    bool flow_control,
    ForwardingOption forwarding_option)
{
    auto new_conn = std::make_unique<SerialConnection>(
        [this](mavlink_message_t& message, Connection* connection) {
            receive_message(message, connection);
        },
        [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
            receive_libmav_message(message, connection);
        },
        *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
        dev_path,
        baudrate,
        flow_control,
        forwarding_option);
    if (!new_conn) {
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
    }
    ConnectionResult ret = new_conn->start();
    if (ret == ConnectionResult::Success) {
        auto handle = add_connection(std::move(new_conn));

        auto new_configuration = get_configuration();

        // PX4 starting with v1.13 does not send heartbeats by default, so we need
        // to initiate the MAVLink connection by sending heartbeats.
        // Therefore, we override the default here and enable sending heartbeats.
        new_configuration.set_always_send_heartbeats(true);
        set_configuration(new_configuration);

        return {ret, handle};

    } else {
        return {ret, Mavsdk::ConnectionHandle{}};
    }
}

Mavsdk::ConnectionHandle MavsdkImpl::add_connection(std::unique_ptr<Connection>&& new_connection)
{
    std::lock_guard lock(_mutex);
    auto handle = _connections_handle_factory.create();
    _connections.emplace_back(ConnectionEntry{std::move(new_connection), handle});

    return handle;
}

void MavsdkImpl::remove_connection(Mavsdk::ConnectionHandle handle)
{
    std::lock_guard lock(_mutex);

    _connections.erase(std::remove_if(_connections.begin(), _connections.end(), [&](auto&& entry) {
        return (entry.handle == handle);
    }));
}

Mavsdk::Configuration MavsdkImpl::get_configuration() const
{
    std::lock_guard configuration_lock(_mutex);
    return _configuration;
}

void MavsdkImpl::set_configuration(Mavsdk::Configuration new_configuration)
{
    std::lock_guard server_components_lock(_server_components_mutex);
    // We just point the default to the newly created component. This means
    // that the previous default component will be deleted if it is not
    // used/referenced anywhere.
    _default_server_component =
        server_component_by_id_with_lock(new_configuration.get_component_id());

    if (new_configuration.get_always_send_heartbeats() &&
        !_configuration.get_always_send_heartbeats()) {
        start_sending_heartbeats();
    } else if (
        !new_configuration.get_always_send_heartbeats() &&
        _configuration.get_always_send_heartbeats() && !is_any_system_connected()) {
        stop_sending_heartbeats();
    }

    _configuration = new_configuration;
    // We cache these values as atomic to avoid having to lock any mutex for them.
    _our_system_id = new_configuration.get_system_id();
    _our_component_id = new_configuration.get_component_id();
}

uint8_t MavsdkImpl::get_own_system_id() const
{
    return _our_system_id;
}

uint8_t MavsdkImpl::get_own_component_id() const
{
    return _our_component_id;
}

uint8_t MavsdkImpl::channel() const
{
    // TODO
    return 0;
}

Autopilot MavsdkImpl::autopilot() const
{
    // TODO
    return Autopilot::Px4;
}

// FIXME: this should be per component
uint8_t MavsdkImpl::get_mav_type() const
{
    return _configuration.get_mav_type();
}

void MavsdkImpl::make_system_with_component(uint8_t system_id, uint8_t comp_id)
{
    // Needs _systems_lock

    if (_should_exit) {
        // When the system got destroyed in the destructor, we have to give up.
        return;
    }

    if (static_cast<int>(system_id) == 0 && static_cast<int>(comp_id) == 0) {
        LogDebug() << "Initializing connection to remote system...";
    } else {
        LogDebug() << "New system ID: " << static_cast<int>(system_id)
                   << " Comp ID: " << static_cast<int>(comp_id);
    }

    // Make a system with its first component
    auto new_system = std::make_shared<System>(*this);
    new_system->init(system_id, comp_id);

    _systems.emplace_back(system_id, new_system);
}

void MavsdkImpl::notify_on_discover()
{
    // Queue the callbacks without holding the mutex to avoid deadlocks
    _new_system_callbacks.queue([this](const auto& func) { call_user_callback(func); });
}

void MavsdkImpl::notify_on_timeout()
{
    // Queue the callbacks without holding the mutex to avoid deadlocks
    _new_system_callbacks.queue([this](const auto& func) { call_user_callback(func); });
}

Mavsdk::NewSystemHandle
MavsdkImpl::subscribe_on_new_system(const Mavsdk::NewSystemCallback& callback)
{
    std::lock_guard lock(_mutex);

    const auto handle = _new_system_callbacks.subscribe(callback);

    if (is_any_system_connected()) {
        _new_system_callbacks.queue([this](const auto& func) { call_user_callback(func); });
    }

    return handle;
}

void MavsdkImpl::unsubscribe_on_new_system(Mavsdk::NewSystemHandle handle)
{
    _new_system_callbacks.unsubscribe(handle);
}

bool MavsdkImpl::is_any_system_connected() const
{
    std::vector<std::shared_ptr<System>> connected_systems = systems();
    return std::any_of(connected_systems.cbegin(), connected_systems.cend(), [](auto& system) {
        return system->is_connected();
    });
}

void MavsdkImpl::work_thread()
{
    while (!_should_exit) {
        // Process incoming messages
        process_messages();

        // Process incoming libmav messages
        process_libmav_messages();

        // Run timers
        timeout_handler.run_once();
        call_every_handler.run_once();

        // Do component work
        {
            std::lock_guard lock(_server_components_mutex);
            for (auto& it : _server_components) {
                if (it.second != nullptr) {
                    it.second->_impl->do_work();
                }
            }
        }

        // Deliver outgoing messages
        deliver_messages();

        // If no messages to send, check if there are messages to receive
        std::unique_lock lock_received(_received_messages_mutex);
        if (_received_messages.empty()) {
            // No messages to process, wait for a signal or timeout
            _received_messages_cv.wait_for(lock_received, std::chrono::milliseconds(10), [this]() {
                return !_received_messages.empty() || _should_exit;
            });
        }
    }
}

void MavsdkImpl::call_user_callback_located(
    const std::string& filename, const int linenumber, const std::function<void()>& func)
{
    // Don't enqueue callbacks if we're shutting down
    if (_should_exit) {
        return;
    }

    auto callback_size = _user_callback_queue.size();
    if (callback_size == 10) {
        LogWarn()
            << "User callback queue too slow.\n"
               "See: https://mavsdk.mavlink.io/main/en/cpp/troubleshooting.html#user_callbacks";

    } else if (callback_size == 99) {
        LogErr()
            << "User callback queue overflown\n"
               "See: https://mavsdk.mavlink.io/main/en/cpp/troubleshooting.html#user_callbacks";

    } else if (callback_size == 100) {
        return;
    }

    // We only need to keep track of filename and linenumber if we're actually debugging this.
    UserCallback user_callback =
        _callback_debugging ? UserCallback{func, filename, linenumber} : UserCallback{func};

    _user_callback_queue.push_back(std::make_shared<UserCallback>(user_callback));
}

void MavsdkImpl::process_user_callbacks_thread()
{
    while (!_should_exit) {
        UserCallback callback;
        {
            LockedQueue<UserCallback>::Guard guard(_user_callback_queue);
            auto ptr = guard.wait_and_pop_front();
            if (!ptr) {
                continue;
            }
            // We need to get a copy instead of just a shared_ptr because the queue might
            // be invalidated when the lock is released.
            callback = *ptr;
        }

        // Check if we're in the process of shutting down before executing the callback
        if (_should_exit) {
            continue;
        }

        const double timeout_s = 1.0;
        auto cookie = timeout_handler.add(
            [&]() {
                if (_callback_debugging) {
                    LogWarn() << "Callback called from " << callback.filename << ":"
                              << callback.linenumber << " took more than " << timeout_s
                              << " second to run.";
                    fflush(stdout);
                    fflush(stderr);
                    abort();
                } else {
                    LogWarn()
                        << "Callback took more than " << timeout_s << " second to run.\n"
                        << "See: https://mavsdk.mavlink.io/main/en/cpp/troubleshooting.html#user_callbacks";
                }
            },
            timeout_s);
        callback.func();
        timeout_handler.remove(cookie);
    }
}

void MavsdkImpl::start_sending_heartbeats()
{
    // Check if we're in the process of shutting down
    if (_should_exit) {
        return;
    }

    // Before sending out first heartbeats we need to make sure we have a
    // default server component.
    default_server_component_impl();

    {
        std::lock_guard<std::mutex> lock(_heartbeat_mutex);
        call_every_handler.remove(_heartbeat_send_cookie);
        _heartbeat_send_cookie =
            call_every_handler.add([this]() { send_heartbeats(); }, HEARTBEAT_SEND_INTERVAL_S);
    }
}

void MavsdkImpl::stop_sending_heartbeats()
{
    if (!_configuration.get_always_send_heartbeats()) {
        std::lock_guard<std::mutex> lock(_heartbeat_mutex);
        call_every_handler.remove(_heartbeat_send_cookie);
    }
}

ServerComponentImpl& MavsdkImpl::default_server_component_impl()
{
    std::lock_guard lock(_server_components_mutex);
    return default_server_component_with_lock();
}

ServerComponentImpl& MavsdkImpl::default_server_component_with_lock()
{
    if (_default_server_component == nullptr) {
        _default_server_component = server_component_by_id_with_lock(_our_component_id);
    }
    return *_default_server_component->_impl;
}

void MavsdkImpl::send_heartbeats()
{
    std::lock_guard lock(_server_components_mutex);

    for (auto& it : _server_components) {
        if (it.second != nullptr) {
            it.second->_impl->send_heartbeat();
        }
    }
}

void MavsdkImpl::intercept_incoming_messages_async(std::function<bool(mavlink_message_t&)> callback)
{
    std::lock_guard<std::mutex> lock(_intercept_callbacks_mutex);
    _intercept_incoming_messages_callback = callback;
}

void MavsdkImpl::intercept_outgoing_messages_async(std::function<bool(mavlink_message_t&)> callback)
{
    std::lock_guard<std::mutex> lock(_intercept_callbacks_mutex);
    _intercept_outgoing_messages_callback = callback;
}

bool MavsdkImpl::call_json_interception_callbacks(
    const Mavsdk::MavlinkMessage& json_message,
    std::vector<std::pair<Mavsdk::InterceptJsonHandle, Mavsdk::InterceptJsonCallback>>&
        callback_list)
{
    bool keep_message = true;

    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
    for (const auto& subscription : callback_list) {
        if (!subscription.second(json_message)) {
            keep_message = false;
        }
    }

    return keep_message;
}

Mavsdk::InterceptJsonHandle
MavsdkImpl::subscribe_incoming_messages_json(const Mavsdk::InterceptJsonCallback& callback)
{
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
    auto handle = _json_handle_factory.create();
    _incoming_json_message_subscriptions.push_back(std::make_pair(handle, callback));
    return handle;
}

void MavsdkImpl::unsubscribe_incoming_messages_json(Mavsdk::InterceptJsonHandle handle)
{
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
    auto it = std::find_if(
        _incoming_json_message_subscriptions.begin(),
        _incoming_json_message_subscriptions.end(),
        [handle](const auto& subscription) { return subscription.first == handle; });
    if (it != _incoming_json_message_subscriptions.end()) {
        _incoming_json_message_subscriptions.erase(it);
    }
}

Mavsdk::InterceptJsonHandle
MavsdkImpl::subscribe_outgoing_messages_json(const Mavsdk::InterceptJsonCallback& callback)
{
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
    auto handle = _json_handle_factory.create();
    _outgoing_json_message_subscriptions.push_back(std::make_pair(handle, callback));
    return handle;
}

void MavsdkImpl::unsubscribe_outgoing_messages_json(Mavsdk::InterceptJsonHandle handle)
{
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
    auto it = std::find_if(
        _outgoing_json_message_subscriptions.begin(),
        _outgoing_json_message_subscriptions.end(),
        [handle](const auto& subscription) { return subscription.first == handle; });
    if (it != _outgoing_json_message_subscriptions.end()) {
        _outgoing_json_message_subscriptions.erase(it);
    }
}

Mavsdk::ConnectionErrorHandle
MavsdkImpl::subscribe_connection_errors(Mavsdk::ConnectionErrorCallback callback)
{
    std::lock_guard lock(_mutex);

    const auto handle = _connections_errors_subscriptions.subscribe(callback);

    return handle;
}

void MavsdkImpl::unsubscribe_connection_errors(Mavsdk::ConnectionErrorHandle handle)
{
    std::lock_guard lock(_mutex);
    _connections_errors_subscriptions.unsubscribe(handle);
}

uint8_t MavsdkImpl::get_target_system_id(const mavlink_message_t& message)
{
    // Checks whether connection knows target system ID by extracting target system if set.
    const mavlink_msg_entry_t* meta = mavlink_get_msg_entry(message.msgid);

    if (meta == nullptr || !(meta->flags & MAV_MSG_ENTRY_FLAG_HAVE_TARGET_SYSTEM)) {
        return 0;
    }

    // Don't look at the target system offset if it is outside the payload length.
    // This can happen if the fields are trimmed.
    if (meta->target_system_ofs >= message.len) {
        return 0;
    }

    return (_MAV_PAYLOAD(&message))[meta->target_system_ofs];
}

uint8_t MavsdkImpl::get_target_component_id(const mavlink_message_t& message)
{
    // Checks whether connection knows target system ID by extracting target system if set.
    const mavlink_msg_entry_t* meta = mavlink_get_msg_entry(message.msgid);

    if (meta == nullptr || !(meta->flags & MAV_MSG_ENTRY_FLAG_HAVE_TARGET_COMPONENT)) {
        return 0;
    }

    // Don't look at the target component offset if it is outside the payload length.
    // This can happen if the fields are trimmed.
    if (meta->target_component_ofs >= message.len) {
        return 0;
    }

    return (_MAV_PAYLOAD(&message))[meta->target_component_ofs];
}

Sender& MavsdkImpl::sender()
{
    std::lock_guard lock(_server_components_mutex);
    return default_server_component_with_lock().sender();
}

std::vector<Connection*> MavsdkImpl::get_connections() const
{
    std::lock_guard lock(_mutex);
    std::vector<Connection*> connections;
    for (const auto& connection_entry : _connections) {
        connections.push_back(connection_entry.connection.get());
    }
    return connections;
}

mav::MessageSet& MavsdkImpl::get_message_set() const
{
    // Note: This returns a reference to MessageSet without locking.
    // Thread safety for MessageSet operations must be ensured by:
    // 1. Using load_custom_xml_to_message_set() for write operations (XML loading)
    // 2. libmav MessageSet should be internally thread-safe for read operations
    // 3. If race conditions persist, consider implementing a thread-safe MessageSet wrapper
    return *_message_set;
}

bool MavsdkImpl::load_custom_xml_to_message_set(const std::string& xml_content)
{
    std::lock_guard<std::mutex> lock(_message_set_mutex);
    auto result = _message_set->addFromXMLString(xml_content, false /* recursive_open_includes */);
    return result == ::mav::MessageSetResult::Success;
}

// Thread-safe MessageSet read operations
std::optional<std::string> MavsdkImpl::message_id_to_name_safe(uint32_t id) const
{
    std::lock_guard<std::mutex> lock(_message_set_mutex);
    auto message_def = _message_set->getMessageDefinition(static_cast<int>(id));
    if (message_def) {
        return message_def.get().name();
    }
    return std::nullopt;
}

std::optional<int> MavsdkImpl::message_name_to_id_safe(const std::string& name) const
{
    std::lock_guard<std::mutex> lock(_message_set_mutex);
    return _message_set->idForMessage(name);
}

std::optional<mav::Message> MavsdkImpl::create_message_safe(const std::string& message_name) const
{
    std::lock_guard<std::mutex> lock(_message_set_mutex);
    return _message_set->create(message_name);
}

// Thread-safe parsing for LibmavReceiver
std::optional<mav::Message> MavsdkImpl::parse_message_safe(
    const uint8_t* buffer, size_t buffer_len, size_t& bytes_consumed) const
{
    std::lock_guard<std::mutex> lock(_message_set_mutex);
    return _buffer_parser->parseMessage(buffer, buffer_len, bytes_consumed);
}

mav::OptionalReference<const mav::MessageDefinition>
MavsdkImpl::get_message_definition_safe(int message_id) const
{
    std::lock_guard<std::mutex> lock(_message_set_mutex);
    return _message_set->getMessageDefinition(message_id);
}

} // namespace mavsdk
